/*
 * Copyright (c) 2010-2025. Axon Framework
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.axonframework.serialization.avro;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.message.SchemaStore;
import org.axonframework.common.annotations.Internal;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.serialization.ChainingContentTypeConverter;
import org.axonframework.serialization.ConversionException;
import org.axonframework.serialization.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.UnaryOperator;

/**
 * Converter providing support for <a href="https://avro.apache.org/">Apache Avro</a>, using
 * <a href="https://avro.apache.org/docs/1.11.0/spec.html#single_object_encoding">Single Object Encoded binary
 * encoding</a>.
 * <p>
 * This converter is intended to work for classes, representing messages specified by Avro Schema.
 * It is limited to conversions specifying {@code Class} as a target type (and can not work on pure {@code Type})
 * definitions.
 * </p>
 * <p>
 * The serialization/deserialization is delegated to the {@link AvroConverterStrategy} implementations. By default, the
 * {@link SpecificRecordBaseConverterStrategy} is provided and configured, to support Java classes generated by Avro
 * Maven plugin (delivered by Avro Java distribution). The {@link AvroConverterConfiguration} allows to register further
 * strategies. At least one strategy has to be configured for this converter to work.
 * </p>
 *
 * @author Simon Zambrovski
 * @author Jan Galinski
 * @since 4.11.0
 */
public class AvroConverter implements Converter {

    private static final Logger logger = LoggerFactory.getLogger(AvroConverter.class);

    private final ChainingContentTypeConverter converter;
    private final List<AvroConverterStrategy> converterStrategies = new ArrayList<>();

    /**
     * Creates the converter instance.
     *
     * @param schemaStore           schema store to use.
     * @param configurationOverride configuration customizer.
     */
    public AvroConverter(
            @Nonnull SchemaStore schemaStore,
            @Nonnull UnaryOperator<AvroConverterConfiguration> configurationOverride
    ) {
        this(schemaStore, configurationOverride, new ChainingContentTypeConverter());
    }

    /**
     * Creates the converter instance.
     *
     * @param schemaStore           schema store to use.
     * @param configurationOverride configuration customizer.
     * @param chainingTypeConverter chaining type converter.
     */
    @Internal
    public AvroConverter(
            @Nonnull SchemaStore schemaStore,
            @Nonnull UnaryOperator<AvroConverterConfiguration> configurationOverride,
            @Nonnull ChainingContentTypeConverter chainingTypeConverter
    ) {
        var config = Objects.requireNonNull(configurationOverride, "the configurationOverride may not be null.")
                            .apply(
                                    new AvroConverterConfiguration(schemaStore)
                            );
        // include default strategy
        if (config.includeDefaultAvroConverterStrategies()) {
            AvroConverterStrategy defaultStrategy = new SpecificRecordBaseConverterStrategy(
                    config.schemaStore(),
                    config.schemaIncompatibilityChecker()
            );
            config = config.addConverterStrategy(defaultStrategy);
        }

        // apply configuration to the strategies
        AvroConverterStrategyConfiguration strategyConfiguration = config.avroConverterStrategyConfiguration();
        config.strategies().forEach(strategy -> strategy.applyStrategyConfiguration(strategyConfiguration));

        this.converterStrategies.addAll(config.strategies());
        if (this.converterStrategies.isEmpty()) {
            throw new IllegalStateException("At least one converter strategy is required, but none were configured.");
        }
        this.converter = chainingTypeConverter;
        this.converter.registerConverter(new ByteArrayToGenericRecordConverter(schemaStore));
    }

    @Override
    public boolean canConvert(@Nonnull Type sourceType,
                              @Nonnull Type targetType) {
        if (logger.isTraceEnabled()) {
            logger.trace("Validating if we can convert from source type [{}] to target type [{}].",
                         sourceType, targetType);
        }
        // no conversion needed
        return sourceType.equals(targetType)
                // e.g. bytes to generic record for upcaster
                || converter.canConvert(sourceType, targetType)
                // any type supported by the strategy to bytes and then by the chain -> serialization
                || (strategyForType(sourceType) && converter.canConvert(byte[].class, targetType))
                // chain to bytes and then to any type supported by a strategy -> deserialization
                || (converter.canConvert(sourceType, byte[].class) && strategyForType(targetType))
                // generic record to any type -> deserialization after upcaster
                || (converter.canConvert(sourceType, GenericRecord.class) && strategyForType(targetType));
    }

    private boolean strategyForType(Type type) {
        if (type instanceof Class) {
            return converterStrategies.stream().anyMatch(it -> it.test((Class<?>) type));
        } else {
            return false; // currently only support types which are classes
        }
    }

    @Nullable
    @Override
    public <T> T convert(@Nullable Object input, @Nonnull Type targetType) {
        if (input == null) {
            if (logger.isTraceEnabled()) {
                logger.trace("Input to convert is null, so returning null immediately.");
            }
            return null;
        }

        if (!(targetType instanceof Class<?> targetClass)) {
            throw new ConversionException("Cannot convert input to " + targetType + ", target type must be a class.");
        }

        Class<?> sourceClass = input.getClass();
        if (sourceClass.equals(targetClass) || targetClass.isAssignableFrom(sourceClass)) {
            if (logger.isTraceEnabled()) {
                logger.trace("Casting given input since source and target type are identical.");
            }
            //noinspection unchecked
            return (T) input;
        }

        if (strategyForType(sourceClass)) { // for serialization
            // input must be supported by the strategy
            // target is either byte[].class orr can be converted to it by the chain
            if (byte[].class.equals(targetType)) {
                return serializeByStrategy(input, sourceClass);
            } else if (converter.canConvert(byte[].class, targetType)) {
                return converter.convert(serializeByStrategy(input, sourceClass), targetType);
            } else {
                throw new ConversionException("Cannot convert given input to target type [" + targetType + "]");
            }
        } else if (strategyForType(targetClass)) { // for de-serialization

            if (byte[].class.equals(sourceClass)) {
                //   straight from the source:
                //     input is byte[].class
                //     target type must be supported by the strategy
                return deserializeByStrategy((byte[]) input, targetClass);
            } else if (converter.canConvert(sourceClass, byte[].class)) {
                //   straight from the source:
                //     input is something converted to byte[].class by the chain
                //     target type must be supported by the strategy

                //noinspection DataFlowIssue
                return deserializeByStrategy(converter.convert(input, byte[].class), targetClass);
            } else if (GenericRecord.class.isAssignableFrom(sourceClass)) {
                //   through upcaster / intermediate representation:
                //     2. after upcaster
                //      input is GenericRecord (upcasted)
                //      target type must be supported by the strategy
                return deserializeByStrategy((GenericRecord) input, targetClass);
            } else {
                throw new ConversionException("Cannot convert given input to target type [" + targetType + "]");
            }
        } else if (GenericRecord.class.equals(targetType)) {
            //   through upcaster / intermediate representation:
            //     1. before upcaster
            //      input is byte[] or something converted to it by the chain
            //      target is GenericRecord -> use chain to convert

            if (converter.canConvert(sourceClass, GenericRecord.class)) {
                //noinspection unchecked
                return (T) converter.convert(input, GenericRecord.class);
            } else {
                throw new ConversionException("Cannot convert given input to target type [" + targetType + "]");
            }
        } else {
            throw new ConversionException("Cannot convert given input to target type [" + targetType + "]");
        }
    }

    private <T> T serializeByStrategy(@Nonnull Object input, @Nonnull Class<?> sourceType) {
        return (T) converterStrategies
                .stream()
                .filter(it -> it.test(sourceType))
                .findFirst()
                .orElseThrow(() -> new ConversionException(
                        "Could not find converter strategy to convert from source type ["
                                + sourceType
                                + "]"
                ))
                .convertToSingleObjectEncoded(input);
    }

    private <T> T deserializeByStrategy(@Nonnull byte[] input, @Nonnull Class<?> targetType) {
        //noinspection unchecked
        return (T) converterStrategies
                .stream()
                .filter(it -> it.test(targetType))
                .findFirst()
                .orElseThrow(() -> new ConversionException(
                        "Could not find converter strategy to convert from bytes to target type ["
                                + targetType
                                + "]"))
                .convertFromSingleObjectEncoded(input, targetType);
    }

    private <T> T deserializeByStrategy(@Nonnull GenericRecord input, @Nonnull Class<?> targetType) {
        //noinspection unchecked
        return (T) converterStrategies
                .stream()
                .filter(it -> it.test(targetType))
                .findFirst()
                .orElseThrow(() -> new ConversionException(
                        "Could not find converter strategy to convert from GenericRecord to target type ["
                                + targetType
                                + "]"))
                .convertFromGenericRecord(input, targetType);
    }

    @Override
    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        for (AvroConverterStrategy strategy : this.converterStrategies) {
            strategy.describeTo(descriptor);
        }
    }
}
